package com.hkbigdata.transform;

import com.hkbigdata.entry.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

/**
 * @author liuanbo
 * @creat 2023-03-19-12:58
 * @see 2194550857@qq.com
 */
public class Flink12_TransFrom_Process_keyby {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        List<WaterSensor> waterSensors = Arrays.asList(
                new WaterSensor("ws_001", 1607527992000L, 20),
                new WaterSensor("ws_001", 1607527992001L, 22),
                new WaterSensor("ws_001", 1607527992002L, 33),
                new WaterSensor("ws_002", 1607527992003L, 44),
                new WaterSensor("ws_002", 1607527992004L, 22),
                new WaterSensor("ws_002", 1607527992001L, 50)
        );

        SingleOutputStreamOperator<Tuple2<String, Integer>> process = env.fromCollection(waterSensors)
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, Tuple2<String, Integer>>() {
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                        out.collect(new Tuple2<>("key 是" + ctx.getCurrentKey(), value.getVs()));
                    }
                });
        process.print();

        env.execute();
    }
}
